{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Regarding Warnings\n",
    "* Warning in the notebook is due to the discrepancy in package version between local and server environment\n",
    "* Readers can ignore them or update the packages to the required version through \"pip install [package]==[version]\""
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Session Connection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from snowflake.snowpark import Session\n",
    "from snowflake.snowpark.functions import col\n",
    "import configparser\n",
    "\n",
    "# Loading Credential From Config\n",
    "snowflake_credentials_file = '../snowflake_creds.config'\n",
    "config = configparser.ConfigParser()\n",
    "config.read(snowflake_credentials_file)\n",
    "connection_parameters = dict(config['default'])\n",
    "\n",
    "session = Session.builder.configs(connection_parameters).create()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Snowflake Objects Creation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql(\"CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH WITH WAREHOUSE_SIZE='X-SMALL'\").collect()\n",
    "session.sql(\"CREATE DATABASE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE\").collect()\n",
    "session.sql(\"CREATE SCHEMA IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA\").collect()\n",
    "session.sql(\"CREATE STAGE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_STAGE\").collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Loading Required Tables\n",
    "\n",
    "* To load the table referenced in this section\n",
    "* Refer chapter_3/chapter_3_data_load.ipynb"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.use_database(\"SNOWPARK_DEFINITIVE_GUIDE\"),\n",
    "session.use_schema(\"MY_SCHEMA\")\n",
    "\n",
    "purchase_history = session.table(\"PURCHASE_HISTORY\")\n",
    "campaign_info = session.table(\"CAMPAIGN_INFO\")\n",
    "complain_info = session.table(\"COMPLAINT_INFO\")\n",
    "marketing_additional = session.table(\"MARKETING_ADDITIONAL\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Data Engineering Pipelines"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Step 1 -  Joining Purchase History table with Campaign Info"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def combine_campaign_table(purchase_history,campaign_info):\n",
    "    purchase_campaign = purchase_history.join(campaign_info, purchase_history.ID == campaign_info.ID\\\n",
    "                        ,lsuffix=\"_left\", rsuffix=\"_right\")\n",
    "    purchase_campaign = purchase_campaign.drop(\"ID_RIGHT\")\n",
    "    return purchase_campaign"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " #### Step 2 -  Joining Purchase Campaign With Complain Info Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def combine_complain_table(purchase_campaign,complain_info):\n",
    "    purchase_campaign_complain = purchase_campaign.join(complain_info, purchase_campaign[\"ID_LEFT\"] == complain_info.ID)\n",
    "    purchase_campaign_complain = purchase_campaign_complain.drop(\"ID_LEFT\")\n",
    "    return purchase_campaign_complain"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Step 3 -  Union Additional Marketing Table with Purchase Campaign Complain Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def union_marketing_additional_table(purchase_campaign_complain,marketing_additional):\n",
    "    final_marketing_table = purchase_campaign_complain.union_by_name(marketing_additional)\n",
    "    return final_marketing_table\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Store Procedure - Data Preparation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [],
   "source": [
    "from snowflake.snowpark.functions import sproc\n",
    "import snowflake\n",
    "\n",
    "def data_prep(session: Session) -> str:\n",
    "\n",
    "\n",
    "    #### Loading Required Tables\n",
    "    purchase_history = session.table(\"PURCHASE_HISTORY\")\n",
    "    campaign_info = session.table(\"CAMPAIGN_INFO\")\n",
    "    complain_info = session.table(\"COMPLAINT_INFO\")\n",
    "    marketing_additional = session.table(\"MARKETING_ADDITIONAL\")\n",
    "\n",
    "    #### Calling Step 1\n",
    "    purchase_campaign = combine_campaign_table(purchase_history,campaign_info)\n",
    "\n",
    "    #### Calling Step 2\n",
    "    purchase_campaign_complain =  combine_complain_table(purchase_campaign,complain_info)\n",
    "\n",
    "    #### Calling Step 3\n",
    "    final_marketing_data = union_marketing_additional_table(purchase_campaign_complain,marketing_additional)\n",
    "\n",
    "    #### Writing Combined Data To New Table\n",
    "    final_marketing_data.write.save_as_table(\"FINAL_MARKETING_DATA\")\n",
    "    return \"LOADED FINAL MARKETING DATA TABLE\"\n",
    "\n",
    "\n",
    "\n",
    "# Create an instance of StoredProcedure using the sproc() function\n",
    "from snowflake.snowpark.types import IntegerType,StringType\n",
    "data_prep_sproc = sproc(\n",
    "                        func= data_prep,\\\n",
    "                        replace=True,\\\n",
    "                        return_type = StringType(),\\\n",
    "                        stage_location=\"@my_stage\",\\\n",
    "                        packages=[\"snowflake-snowpark-python\"]\n",
    "                        )\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Stored Procedure -  Data Transformation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "def data_transform(session: Session) -> str:\n",
    "\n",
    "    #### Loading Required Tables\n",
    "    marketing_final = session.table(\"FINAL_MARKETING_DATA\")\n",
    "    market_subset = marketing_final.select(\"EDUCATION\",\"MARITAL_STATUS\",\"INCOME\")\n",
    "    market_pivot = market_subset.pivot(\"EDUCATION\",[\"Graduation\",\"PhD\",\"Master\",\"Basic\",\"2n Cycle\"]).sum(\"INCOME\")\n",
    "\n",
    "\n",
    "    #### Writing Transformed Data To New Table\n",
    "    market_pivot.write.save_as_table(\"MARKETING_PIVOT\")\n",
    "    return \"CREATED MARKETING PIVOT TABLE\"\n",
    "\n",
    "\n",
    "data_transform_sproc = sproc(\n",
    "                        func= data_transform,\\\n",
    "                        replace=True,\\\n",
    "                        return_type = StringType(),\\\n",
    "                        stage_location=\"@my_stage\",\\\n",
    "                        packages=[\"snowflake-snowpark-python\"]\n",
    "                        )\n",
    "    "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Stored Procedure -  Data Cleanup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "def data_cleanup(session: Session) -> str:\n",
    "\n",
    "    #### Loading Required Tables\n",
    "    market_pivot = session.table(\"MARKETING_PIVOT\")\n",
    "    \n",
    "    market_drop_null = market_pivot.dropna(thresh=5)\n",
    "\n",
    "\n",
    "    #### Writing Cleaned Data To New Table\n",
    "    market_drop_null.write.save_as_table(\"MARKET_PIVOT_CLEANED\")\n",
    "    return \"CREATED CLEANED TABLE\"\n",
    "\n",
    "\n",
    "data_cleanup_sproc = sproc(\n",
    "                        func= data_cleanup,\\\n",
    "                        replace=True,\\\n",
    "                        return_type = StringType(),\\\n",
    "                        stage_location=\"@my_stage\",\\\n",
    "                        packages=[\"snowflake-snowpark-python\"]\n",
    "                        )\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [],
   "source": [
    "# session.sql(\"drop table FINAL_MARKETING_DATA;\").show()\n",
    "# session.sql(\"drop table MARKETING_PIVOT;\").show()\n",
    "# session.sql(\"drop table MARKET_PIVOT_CLEANED;\").show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Calling Stored Procedure"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'CREATED CLEANED TABLE'"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "#### Calling Data Preparation Stored Procedure\n",
    "data_prep_sproc()\n",
    "\n",
    "#### Calling Data Transformation Stored Procedure\n",
    "data_transform_sproc()\n",
    "\n",
    "#### Calling Data Cleanup Stored Procedure\n",
    "data_cleanup_sproc()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Tasks & DAG"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# !pip install snowflake-core"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from snowflake.core import Root\n",
    "from snowflake.core.task import StoredProcedureCall\n",
    "from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation\n",
    "from snowflake.snowpark import Session\n",
    "from datetime import timedelta\n",
    "\n",
    "root = Root(session)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {},
   "outputs": [],
   "source": [
    "dag = DAG(\"Task_Demo\",\n",
    "          warehouse=\"COMPUTE_WH\",\n",
    "          schedule=timedelta(days=1),\n",
    "          stage_location=\"SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_STAGE\",\n",
    "          packages=[\"snowflake-snowpark-python\"]\n",
    "          )\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {},
   "outputs": [],
   "source": [
    "with dag:\n",
    "\n",
    "  data_prep_task = DAGTask(\"Data_Prep\", definition=data_prep)\n",
    "  data_transform_task = DAGTask(\"Data_Transform\", definition=data_transform)\n",
    "  data_cleanup_task = DAGTask(\"Data_Cleanup\", definition=data_cleanup)\n",
    "  data_prep_task >> data_transform_task >> data_cleanup_task \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "The version of package 'snowflake-snowpark-python' in the local environment is 1.12.1, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.\n",
      "The version of package 'snowflake-snowpark-python' in the local environment is 1.12.1, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.\n",
      "The version of package 'snowflake-snowpark-python' in the local environment is 1.12.1, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.\n"
     ]
    }
   ],
   "source": [
    "schema = root.databases[\"SNOWPARK_DEFINITIVE_GUIDE\"].schemas[\"MY_SCHEMA\"]\n",
    "dag_op = DAGOperation(schema)\n",
    "dag_op.deploy(dag,mode=\"orReplace\")\n",
    "dag_op.run(dag)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "RunId=1712744391720 State=SCHEDULED\n"
     ]
    }
   ],
   "source": [
    "current_runs = dag_op.get_current_dag_runs(dag)\n",
    "for r in current_runs:\n",
    "    print(f\"RunId={r.run_id} State={r.state}\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Logging & Trace In Snowpark"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Creating and Setting Up Event Table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql('''CREATE EVENT TABLE MY_EVENTS;''').show()\n",
    "session.sql('''ALTER ACCOUNT SET EVENT_TABLE = SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_EVENTS;''').show()\n",
    "session.sql('''alter session set log_level = INFO;''').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql('''SELECT * FROM SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_EVENTS;''').show()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.sql ('''CREATE STREAM EVENT_APPEND ON EVENT TABLE MY_EVENTS APPEND_ONLY=TRUE;''').show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Data Prep Stored Procedure with Info Logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<snowflake.snowpark.stored_procedure.StoredProcedure at 0x2088c571940>"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from snowflake.snowpark.functions import sproc\n",
    "import logging\n",
    "\n",
    "def data_prep(session: Session):\n",
    "\n",
    "    ## Initializing Logger\n",
    "    logger = logging.getLogger(\"My_Logger\")\n",
    "    logger.info(\"Data Preparation Pipeline Starts\")\n",
    "    \n",
    "    #### Loading Required Tables\n",
    "    logger.info(\"Loading Required Tables\")\n",
    "\n",
    "    purchase_history = session.table(\"PURCHASE_HISTORY\")\n",
    "    campaign_info = session.table(\"CAMPAIGN_INFO\")\n",
    "    complain_info = session.table(\"COMPLAINT_INFO\")\n",
    "    marketing_additional = session.table(\"MARKETING_ADDITIONAL\")\n",
    "\n",
    "    #### Calling Step 1\n",
    "    purchase_campaign = combine_campaign_table(purchase_history,campaign_info)\n",
    "\n",
    "    logger.info(\"Joined Purchase and Campaign Tables\")\n",
    "\n",
    "    #### Calling Step 2\n",
    "    purchase_campaign_complain =  combine_complain_table(purchase_campaign,complain_info)\n",
    "\n",
    "    logger.info(\"Joined Complain Table\")\n",
    "\n",
    "    #### Calling Step 3\n",
    "    final_marketing_data = union_marketing_additional_table(purchase_campaign_complain,marketing_additional)\n",
    "\n",
    "    logger.info(\"Final Marketing Data Created\")\n",
    "\n",
    "    #### Writing Combined Data To New Table\n",
    "    final_marketing_data.write.save_as_table(\"FINAL_MARKETING_DATA\",mode=\"overwrite\")\n",
    "\n",
    "    logger.info(\"Final Marketing Data Table Created\")    \n",
    "    return \"LOADED FINAL MARKETING DATA TABLE\"\n",
    "\n",
    "\n",
    "##################################################################\n",
    "## Register Stored Procedure in Snowflake\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StringType\n",
    "session.add_packages('snowflake-snowpark-python')\n",
    "\n",
    "### Upload Stored Procedure to Snowflake\n",
    "session.sproc.register(\n",
    "    func = data_prep\n",
    "  , return_type = StringType()\n",
    "  , input_types = []\n",
    "  , is_permanent = True\n",
    "  , name = 'DATA_PREP_SPROC_LOG'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Excecuting Stored Procedure - Logging"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-------------------------------------\n",
      "|\"DATA_PREP_SPROC_LOG\"              |\n",
      "-------------------------------------\n",
      "|LOADED FINAL MARKETING DATA TABLE  |\n",
      "-------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "session.sql(''' Call DATA_PREP_SPROC_LOG()''').show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Retriving the Logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------------------------------\n",
      "|\"SEVERITY\"  |\"MESSAGE\"                              |\n",
      "------------------------------------------------------\n",
      "|\"INFO\"      |\"Data Preparation Pipeline Starts\"     |\n",
      "|\"INFO\"      |\"Loading Required Tables\"              |\n",
      "|\"INFO\"      |\"Joined Purchase and Campaign Tables\"  |\n",
      "|\"INFO\"      |\"Joined Complain Table\"                |\n",
      "|\"INFO\"      |\"Final Marketing Data Created\"         |\n",
      "|\"INFO\"      |\"Final Marketing Data Table Created\"   |\n",
      "------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(\"\"\"SELECT RECORD['severity_text'] AS SEVERITY,VALUE AS MESSAGE\n",
    "        FROM MY_EVENTS\n",
    "        WHERE SCOPE['name'] = 'My_Logger'\n",
    "        AND RECORD_TYPE = 'LOG'\"\"\").show()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Handling Exceptions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<snowflake.snowpark.stored_procedure.StoredProcedure at 0x192d394d190>"
      ]
     },
     "execution_count": 49,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "def data_transform(session: Session):\n",
    "    try:\n",
    "\n",
    "        ## Initializing Logger\n",
    "        logger = logging.getLogger(\"Data_Transform_Logger\")\n",
    "        logger.info(\"Data Transformation Pipeline Starts\")\n",
    "\n",
    "        ## Pivoting Process \n",
    "        marketing_final = session.table(\"FINAL_MARKETING_DATA\")\n",
    "        market_subset = marketing_final.select(\"EDUCATION\",\"MARITAL_STATUS\",\"INCOME\")\n",
    "        market_pivot = market_subset.pivot(\"EDUCATION\",[\"Graduation\",\"PhD\",\"Master\",\"Basic\",\"2n Cycle\"]).sum(\"INCOME\")\n",
    "\n",
    "        #### Writing Transformed Data To New Table\n",
    "        market_pivot.write.save_as_table(\"MAREKTING_PIVOT\", mode=\"overwrite\")\n",
    "        logger.log(\"MARKETING PIVOT TABLE CREATED\")\n",
    "        return \"CREATED MARKETING PIVOT TABLE\"\n",
    "\n",
    "    except Exception as err:\n",
    "        logger.error(\"Logging an error from Python handler: \")\n",
    "        logger.error(err)\n",
    "        return \"ERROR\"\n",
    "\n",
    "\n",
    "##################################################################\n",
    "## Register Stored Procedure in Snowflake\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StringType\n",
    "session.add_packages('snowflake-snowpark-python')\n",
    "\n",
    "### Upload Stored Produre to Snowflake\n",
    "session.sproc.register(\n",
    "    func = data_transform\n",
    "  , return_type = StringType()\n",
    "  , input_types = []\n",
    "  , is_permanent = True\n",
    "  , name = 'DATA_TRANSFORM_SPROC_LOG_ERROR'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------------\n",
      "|\"DATA_TRANSFORM_SPROC_LOG_ERROR\"  |\n",
      "------------------------------------\n",
      "|ERROR                             |\n",
      "------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(''' Call DATA_TRANSFORM_SPROC_LOG_ERROR()''').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(SEVERITY='\"INFO\"', MESSAGE='\"Data Preparation Pipeline Starts\"'),\n",
       " Row(SEVERITY='\"ERROR\"', MESSAGE='\"Logging an error from Python handler: \"'),\n",
       " Row(SEVERITY='\"ERROR\"', MESSAGE='\"(1304): 01aea100-0001-0458-0001-9ba600015746: 002002 (42710): 01aea100-0001-0458-0001-9ba600015746: SQL compilation error:\\\\nObject \\'MAREKTING_PIVOT\\' already exists.\"'),\n",
       " Row(SEVERITY='\"INFO\"', MESSAGE='\"Data Transformation Pipeline Starts\"'),\n",
       " Row(SEVERITY='\"ERROR\"', MESSAGE='\"Logging an error from Python handler: \"'),\n",
       " Row(SEVERITY='\"ERROR\"', MESSAGE='\"(1304): 01aea101-0001-0458-0001-9ba60001576a: 002002 (42710): 01aea101-0001-0458-0001-9ba60001576a: SQL compilation error:\\\\nObject \\'MAREKTING_PIVOT\\' already exists.\"')]"
      ]
     },
     "execution_count": 60,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "session.sql(\"\"\"SELECT RECORD['severity_text'] AS SEVERITY,VALUE AS MESSAGE\n",
    "        FROM MY_EVENTS\n",
    "        WHERE SCOPE['name'] = 'Data_Transform_Logger'\n",
    "        AND RECORD_TYPE = 'LOG'\"\"\").collect()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Setting Up Event Traces"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------------\n",
      "|\"status\"                          |\n",
      "------------------------------------\n",
      "|Statement executed successfully.  |\n",
      "------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#session.sql(\"alter session set trace_level = ON_EVENT;\").show()\n",
    "session.sql(\"ALTER SESSION SET TRACE_LEVEL = ALWAYS;\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "The version of package snowflake-telemetry-python in the local environment is 0.2.0, which does not fit the criteria for the requirement snowflake-telemetry-python. Your UDF might not work when the package version is different between the server and your local environment\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<snowflake.snowpark.stored_procedure.StoredProcedure at 0x2088c66d340>"
      ]
     },
     "execution_count": 40,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "def data_cleanup(session: Session):\n",
    "\n",
    "    #### Loading Telemetry Package\n",
    "    from snowflake import telemetry\n",
    "\n",
    "    #### Loading Required Tables\n",
    "    market_pivot = session.table(\"MARKETING_PIVOT\")\n",
    "    \n",
    "    #### Adding Trace Event\n",
    "    telemetry.add_event(\"data_cleanup\", {\"table_name\": \"MARKETING_PIVOT\", \"count\": market_pivot.count()})\n",
    "\n",
    "    #### Dropping Null\n",
    "    market_drop_null = market_pivot.dropna(thresh=5)\n",
    "\n",
    "    #### Writing Cleaned Data To New Table\n",
    "    market_drop_null.write.save_as_table(\"MARKET_PIVOT_CLEANED\", mode=\"overwrite\")\n",
    "\n",
    "    #### Adding Trace Event\n",
    "    telemetry.add_event(\"data_cleanup\", {\"table_name\": \"MARKET_PIVOT_CLEANED\", \"count\": market_drop_null.count()})\n",
    "\n",
    "    return \"CREATED CLEANED TABLE\"\n",
    "\n",
    "\n",
    "##################################################################\n",
    "## Register Stored Procedure in Snowflake\n",
    "\n",
    "### Add packages and data types\n",
    "from snowflake.snowpark.types import StringType\n",
    "session.add_packages('snowflake-snowpark-python', 'snowflake-telemetry-python')\n",
    "\n",
    "### Upload Stored Procedure to Snowflake\n",
    "session.sproc.register(\n",
    "    func = data_cleanup\n",
    "  , return_type = StringType()\n",
    "  , input_types = []\n",
    "  , is_permanent = True\n",
    "  , name = 'DATA_CLEANUP_SPROC_TRACE'\n",
    "  , replace = True\n",
    "  , stage_location = '@MY_STAGE'\n",
    ")  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "------------------------------\n",
      "|\"DATA_CLEANUP_SPROC_TRACE\"  |\n",
      "------------------------------\n",
      "|CREATED CLEANED TABLE       |\n",
      "------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(''' Call DATA_CLEANUP_SPROC_TRACE()''').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "----------------------------------------------------------------------------------------------------------------------------------------------------------\n",
      "|\"TIME\"                      |\"HANDLER_NAME\"                                  |\"HANDLER_TYPE\"  |\"EVENT_NAME\"    |\"ATTRIBUTES\"                            |\n",
      "----------------------------------------------------------------------------------------------------------------------------------------------------------\n",
      "|2023-08-29 16:12:14.767718  |\"DATA_CLEANUP_SPROC_TRACE():VARCHAR(16777216)\"  |\"PROCEDURE\"     |\"data_cleanup\"  |{                                       |\n",
      "|                            |                                                |                |                |  \"count\": 8,                           |\n",
      "|                            |                                                |                |                |  \"table_name\": \"MARKETING_PIVOT\"       |\n",
      "|                            |                                                |                |                |}                                       |\n",
      "|2023-08-29 16:12:15.707074  |\"DATA_CLEANUP_SPROC_TRACE():VARCHAR(16777216)\"  |\"PROCEDURE\"     |\"data_cleanup\"  |{                                       |\n",
      "|                            |                                                |                |                |  \"count\": 5,                           |\n",
      "|                            |                                                |                |                |  \"table_name\": \"MARKET_PIVOT_CLEANED\"  |\n",
      "|                            |                                                |                |                |}                                       |\n",
      "----------------------------------------------------------------------------------------------------------------------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(\"\"\" SELECT\n",
    "  TIMESTAMP as time,\n",
    "  RESOURCE_ATTRIBUTES['snow.executable.name'] as handler_name,\n",
    "  RESOURCE_ATTRIBUTES['snow.executable.type'] as handler_type,\n",
    "  RECORD['name'] as event_name,\n",
    "  RECORD_ATTRIBUTES as attributes\n",
    "FROM\n",
    "  MY_EVENTS\n",
    "WHERE\n",
    "  EVENT_NAME  ='data_cleanup'\n",
    "\"\"\").show(2)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-----------------------------------------\n",
      "|\"status\"                               |\n",
      "-----------------------------------------\n",
      "|MARKETING_PIVOT successfully dropped.  |\n",
      "-----------------------------------------\n",
      "\n",
      "----------------------------------------------\n",
      "|\"status\"                                    |\n",
      "----------------------------------------------\n",
      "|MARKET_PIVOT_CLEANED successfully dropped.  |\n",
      "----------------------------------------------\n",
      "\n"
     ]
    }
   ],
   "source": [
    "session.sql(\"drop table MARKET_PIVOT_CLEANED;\").show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "def_gui_3.8_env",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.16"
  },
  "orig_nbformat": 4
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
